package requestmgr

import (
	"context"
	"fmt"
	"slices"
	"strconv"
	"strings"
	"time"

	"github.com/pkg/errors"
	imgCVEDataStore "github.com/stackrox/rox/central/cve/image/v2/datastore"
	deploymentDataStore "github.com/stackrox/rox/central/deployment/datastore"
	imgDataStore "github.com/stackrox/rox/central/image/datastore"
	imgV2DataStore "github.com/stackrox/rox/central/imagev2/datastore"
	"github.com/stackrox/rox/central/reprocessor"
	"github.com/stackrox/rox/central/sensor/service/connection"
	views "github.com/stackrox/rox/central/views/imagecve"
	"github.com/stackrox/rox/central/vulnmgmt/vulnerabilityrequest/cache"
	"github.com/stackrox/rox/central/vulnmgmt/vulnerabilityrequest/common"
	vulnReqDataStore "github.com/stackrox/rox/central/vulnmgmt/vulnerabilityrequest/datastore"
	"github.com/stackrox/rox/central/vulnmgmt/vulnerabilityrequest/utils"
	"github.com/stackrox/rox/central/vulnmgmt/vulnerabilityrequest/validator"
	v1 "github.com/stackrox/rox/generated/api/v1"
	"github.com/stackrox/rox/generated/internalapi/central"
	"github.com/stackrox/rox/generated/storage"
	"github.com/stackrox/rox/pkg/concurrency"
	"github.com/stackrox/rox/pkg/errorhelpers"
	"github.com/stackrox/rox/pkg/errox"
	"github.com/stackrox/rox/pkg/features"
	"github.com/stackrox/rox/pkg/grpc/authn"
	"github.com/stackrox/rox/pkg/protoconv"
	"github.com/stackrox/rox/pkg/sac"
	"github.com/stackrox/rox/pkg/sac/resources"
	"github.com/stackrox/rox/pkg/search"
	deploymentOptionsMap "github.com/stackrox/rox/pkg/search/options/deployments"
	"github.com/stackrox/rox/pkg/set"
	"github.com/stackrox/rox/pkg/stringutils"
	"github.com/stackrox/rox/pkg/sync"
	"github.com/stackrox/rox/pkg/uuid"
	"golang.org/x/sync/semaphore"
)

var (
	batchSize = 1000

	// Give the processor access as an approver so that it can properly expire
	allAccessCtx             = sac.WithAllAccess(context.Background())
	allVulnApproverAccessSac = sac.WithGlobalAccessScopeChecker(context.Background(),
		sac.AllowFixedScopes(
			sac.AccessModeScopeKeys(storage.Access_READ_ACCESS, storage.Access_READ_WRITE_ACCESS),
			sac.ResourceScopeKeys(resources.VulnerabilityManagementApprovals)))

	clusterIDField = deploymentOptionsMap.OptionsMap.MustGet(search.ClusterID.String())
)

type managerImpl struct {
	deployments     deploymentDataStore.DataStore
	images          imgDataStore.DataStore
	imagesV2        imgV2DataStore.DataStore
	imageCVEs       imgCVEDataStore.DataStore
	vulnReqs        vulnReqDataStore.DataStore
	connManager     connection.Manager
	reprocessor     reprocessor.Loop
	activeReqCache  cache.VulnReqCache
	pendingReqCache cache.VulnReqCache
	// Stores the UTC month and seq num for that month for most recently created request, if any.
	lastKnownSeqNumInfo *monthSeqNumPair

	revertTimedDeferralsTickerDuration      time.Duration
	revertFixableCVEDeferralsTickerDuration time.Duration

	stopper      concurrency.Stopper
	upsertSem    *semaphore.Weighted
	seqNumLock   sync.Mutex
	imageCVEView views.CveView
}

type monthSeqNumPair struct {
	lastCreatedMonth time.Month
	seqNum           int
}

func (m *managerImpl) Start() {
	if err := m.buildCache(); err != nil {
		log.Errorf("Could not build vulnerability request cache. Vulnerability snoozing and unsnoozing may not work correctly: %v", err)
	}
	m.reconcileLastSeqNum()
	go m.runExpiredDeferralsProcessor()
}

func (m *managerImpl) Stop() {
	m.stopper.Client().Stop()
	_ = m.stopper.Client().Stopped().Wait()
}

func (m *managerImpl) Create(ctx context.Context, req *storage.VulnerabilityRequest) error {
	if err := validator.ValidateNewSuppressVulnRequest(req); err != nil {
		return errors.Wrap(errox.InvalidArgs, err.Error())
	}

	// Process one request at a time since there could be a race between validation and database update.
	if err := m.upsertSem.Acquire(ctx, 1); err != nil {
		return errors.Wrap(err, "creating vulnerability request")
	}
	defer m.upsertSem.Release(1)

	// Validate that the CVEs in `req` are not snoozed for the scope specified in the request.
	// Only one request can exist for the exact same CVE and scope _object_ combo regardless of target state (FP or defer).
	// There might be another request where the scope object evaluates to the same, but that is allowed. Just not the same CVE.

	// Find all requests for this CVE that are active and approved
	existingReqs, err := m.vulnReqs.SearchRawRequests(ctx, utils.GetEnforcedRequestsV1Query(req.GetCves().GetCves()...))
	if err != nil {
		return errors.Wrap(err, "could not search for other vulnerability requests")
	}
	if idx := utils.FirstIndexMatchingScope(req, existingReqs); idx != -1 {
		newReqCVEs := set.NewStringSet(req.GetCves().GetCves()...)
		existingReqCVEs := set.NewStringSet(existingReqs[idx].GetCves().GetCves()...)
		commonCVEs := newReqCVEs.Intersect(existingReqCVEs).AsSlice()
		errPrefix := fmt.Sprintf("CVEs %s are", commonCVEs)
		if len(commonCVEs) == 1 {
			errPrefix = fmt.Sprintf("CVE %s is", commonCVEs[0])
		}
		return errors.Wrap(errox.AlreadyExists,
			fmt.Sprintf("%s already covered by request %s for the scope", errPrefix, existingReqs[idx].GetId()))
	}

	m.setIDAndName(req)

	if err := m.vulnReqs.AddRequest(ctx, req); err != nil {
		return errors.Wrap(err, "could not create vulnerability request")
	}
	if utils.IsPending(req) {
		m.pendingReqCache.Add(req)
	}
	return nil
}

func (m *managerImpl) Approve(ctx context.Context, id string, reqParams *common.VulnRequestParams) (*storage.VulnerabilityRequest, error) {
	if reqParams == nil || reqParams.Comment == "" {
		return nil, errors.Wrap(errox.InvalidArgs, "comment must be provided")
	}

	// Process one request at a time since there could be a race with vulnerability request creation or another request approval.
	//
	// Once the request is approved, it is enforced. Thus, the vulnerability request creation may validate against a
	// system state prior to this approval and may upsert after this approval.
	if err := m.upsertSem.Acquire(ctx, 1); err != nil {
		return nil, errors.Wrapf(err, "approving vulnerability request %s", id)
	}
	defer m.upsertSem.Release(1)
	req, err := m.vulnReqs.UpdateRequestStatus(ctx, id, reqParams.Comment, storage.RequestStatus_APPROVED)
	if err != nil {
		return nil, errors.Wrapf(err, "approving vulnerability request %s", id)
	}

	// Find and invalidate the pending requests covering the same CVEs and scope.
	existingReqsQ := search.DisjunctionQuery(
		utils.GetPendingRequestsV1Query(req.GetCves().GetCves()...),
		utils.GetPendingUpdateRequestsV1Query(req.GetCves().GetCves()...),
	)

	existingReqs, err := m.vulnReqs.SearchRawRequests(ctx, existingReqsQ)
	if err != nil {
		return nil, errors.Wrap(err, "could not search for other vulnerability requests")
	}

	invalidateReqComment := func(reqToDeny *storage.VulnerabilityRequest) *common.VulnRequestParams {
		prefix := "Vulnerability request"
		if reqToDeny.GetStatus() == storage.RequestStatus_APPROVED_PENDING_UPDATE {
			prefix = "Vulnerability update request"
		}
		return &common.VulnRequestParams{
			Comment: fmt.Sprintf("%s %s was auto-declined when request %s covering one or more requested CVEs was approved", prefix, reqToDeny.GetId(), id),
		}
	}
	for _, coveredReq := range utils.RequestsWithCoveredScope(req, existingReqs) {
		// Use the identity of approving user to decline similar requests.
		_, err := m.Deny(ctx, coveredReq.GetId(), invalidateReqComment(coveredReq))
		if err != nil {
			return nil, err
		}
	}

	if err := m.SnoozeVulnerabilityOnRequest(ctx, req); err != nil {
		return nil, errors.Wrapf(err, "enforcing approved vulnerability request %s", id)
	}
	return req, nil
}

func (m *managerImpl) Delete(ctx context.Context, id string) error {
	if err := m.vulnReqs.RemoveRequest(ctx, id); err != nil {
		return err
	}
	m.pendingReqCache.Remove(id)
	// We do not allow deleting active requests. Only pending requests and pending request updates can be removed.
	// Hence, skip the active cache.
	return nil
}

func (m *managerImpl) Deny(ctx context.Context, id string, reqParams *common.VulnRequestParams) (*storage.VulnerabilityRequest, error) {
	if reqParams.Comment == "" {
		return nil, errors.Wrap(errox.InvalidArgs, "comment must be provided")
	}
	req, err := m.vulnReqs.UpdateRequestStatus(ctx, id, reqParams.Comment, storage.RequestStatus_DENIED)
	if err != nil {
		return nil, errors.Wrapf(err, "denying vulnerability request %s", id)
	}
	// Request (Request update) that is not approved can only be in pending cache.
	m.pendingReqCache.Remove(req.GetId())
	// No need to unsnooze or snooze the vulns. A denial state is reached only from pending state which has
	// no effect on policy or risk workflow.
	return req, nil
}

func (m *managerImpl) Undo(ctx context.Context, id string, _ *common.VulnRequestParams) (*storage.VulnerabilityRequest, error) {
	req, err := m.vulnReqs.MarkRequestInactive(ctx, id, "[System Generated] Request undone")
	if err != nil {
		return nil, errors.Wrapf(err, "undoing vulnerability request %s", id)
	}
	if err := m.UnSnoozeVulnerabilityOnRequest(ctx, req); err != nil {
		return nil, errors.Wrapf(err, "undoing vulnerability request %s", id)
	}
	return req, nil
}

func (m *managerImpl) Cancel(ctx context.Context, id string) (*storage.VulnerabilityRequest, error) {
	if id == "" {
		return nil, errox.InvalidArgs.CausedBy("vulnerability exception ID must be specified")
	}

	comment := "[System Generated] Vulnerability exception canceled"
	user := authn.UserFromContext(ctx)
	if user == nil {
		log.Errorf("failed to determine user for vulnerability exception %s", id)
	} else {
		comment = fmt.Sprintf("%s by %s", comment, user.GetName())
	}

	req, err := m.vulnReqs.MarkRequestInactive(ctx, id, comment)
	if err != nil {
		return nil, errors.Wrapf(err, "canceling vulnerability exception %s", id)
	}
	// Revert the exception enforcement.
	if err := m.UnSnoozeVulnerabilityOnRequest(ctx, req); err != nil {
		return nil, errors.Wrapf(err, "removing vulnerability exception %s enforcement", id)
	}
	return req, nil
}

func (m *managerImpl) UpdateExpiry(ctx context.Context, id string, reqParams *common.VulnRequestParams) (*storage.VulnerabilityRequest, error) {
	if id == "" {
		return nil, errox.InvalidArgs.CausedBy("vulnerability exception ID must be specified")
	}
	if reqParams == nil {
		return nil, errox.InvalidArgs.CausedBy("vulnerability exception cannot be nil")
	}
	// In v1, only deferral requests can be updated
	if reqParams.Expiry == nil {
		return nil, errors.Wrap(errox.InvalidArgs, "nothing to update for request - at least expiry must be provided")
	}

	req, err := m.vulnReqs.UpdateRequestExpiry(ctx, id, reqParams.Comment, reqParams.Expiry)
	if err != nil {
		return nil, errors.Wrapf(err, "updating vulnerability request %s", id)
	}
	m.pendingReqCache.Add(req)
	// No need to unsnooze or snooze the vulns. This update is still pending, therefore, it does not take effect
	// until approval. Meanwhile, the original request config remains in effect if it was approved.
	return req, nil
}

func (m *managerImpl) UpdateException(ctx context.Context, id string, update *common.UpdateRequest) (*storage.VulnerabilityRequest, error) {
	if id == "" {
		return nil, errox.InvalidArgs.CausedBy("vulnerability exception ID must be specified")
	}
	if err := validator.ValidateUpdate(update); err != nil {
		return nil, err
	}

	req, err := m.vulnReqs.UpdateRequest(ctx, id, update)
	if err != nil {
		return nil, errors.Wrapf(err, "updating vulnerability exception %s", id)
	}
	m.pendingReqCache.Add(req)
	// No need to unsnooze or snooze the vulns. This update is still pending, therefore, it is not enforced
	// until approval.
	return req, nil
}

// SnoozeVulnerabilityOnRequest snoozes the CVE for the scope specified by the request
// Snoozed vulns won't result in a policy violation nor will it be included in risk calculation.
func (m *managerImpl) SnoozeVulnerabilityOnRequest(_ context.Context, request *storage.VulnerabilityRequest) error {
	// Snooze the vulns only if the request was approved and not expired.
	if request.GetExpired() || request.GetStatus() != storage.RequestStatus_APPROVED {
		return errors.Errorf("vulnerability request %s not approved or expired", request.GetId())
	}

	// Add to the activeReqCache first because the request could be for images not detected in system.
	m.activeReqCache.Add(request)
	m.pendingReqCache.Remove(request.GetId())

	// Search for images matching the scope.
	// Validation of image-cve existence is performed by the image-cve datastore.
	imageIDs, err := m.getImagesIDsForVulnRequest(request)
	if err != nil {
		return errors.Wrapf(err, "could not fetch images matching vulnerability exception %s", request.GetId())
	}
	if len(imageIDs) == 0 {
		return nil
	}

	for _, imageID := range imageIDs {
		if features.FlattenImageData.Enabled() {
			img, found, err := m.imagesV2.GetImageMetadata(allAccessCtx, imageID)
			if err != nil {
				return errors.Wrapf(err, "could not fetch image metadata for image %s", imageID)
			}
			if !found {
				continue
			}
			// Determine the effective request for the cves in the image scope.
			cveStateMap := m.activeReqCache.GetEffectiveVulnStateForImage(
				request.GetCves().GetCves(),
				img.GetName().GetRegistry(),
				img.GetName().GetRemote(),
				img.GetName().GetTag(),
			)
			for _, cve := range request.GetCves().GetCves() {
				if err := m.imagesV2.UpdateVulnerabilityState(allAccessCtx, cve, []string{imageID}, cveStateMap[cve]); err != nil {
					return errors.Wrapf(err, "could not apply exception to vulnerabilities in image %s for request %s", imageID, request.GetId())
				}
			}
		} else {
			img, found, err := m.images.GetImageMetadata(allAccessCtx, imageID)
			if err != nil {
				return errors.Wrapf(err, "could not fetch metadata for image %s", imageID)
			}
			if !found {
				continue
			}
			// Determine the effective request for the cves in the image scope.
			cveStateMap := m.activeReqCache.GetEffectiveVulnStateForImage(
				request.GetCves().GetCves(),
				img.GetName().GetRegistry(),
				img.GetName().GetRemote(),
				img.GetName().GetTag(),
			)
			for _, cve := range request.GetCves().GetCves() {
				if err := m.images.UpdateVulnerabilityState(allAccessCtx, cve, []string{imageID}, cveStateMap[cve]); err != nil {
					return errors.Wrapf(err, "could not apply exception to vulnerabilities in image %s for request %s", imageID, request.GetId())
				}
			}
		}
	}

	go m.reprocessAffectedEntities(request.GetId(), imageIDs...)
	return nil
}

// UnSnoozeVulnerabilityOnRequest unsnoozes the CVE for the scope specified by the request
// unless there is another request that is still active that causes this CVE to remain snoozed
func (m *managerImpl) UnSnoozeVulnerabilityOnRequest(_ context.Context, request *storage.VulnerabilityRequest) error {
	// Visit pending cache to ensure that the entry added because of a deferral expiry update,
	// that puts the request into APPROVED_PENDING_UPDATE,is removed.
	m.pendingReqCache.Remove(request.GetId())
	m.activeReqCache.Remove(request.GetId())

	// Search for images matching the scope instead of image+cve combination.
	// Validation of image-cve existence is performed by the image-cve datastore.
	imageIDs, err := m.getImagesIDsForVulnRequest(request)
	if err != nil {
		return errors.Wrapf(err, "could not fetch images matching vulnerability exception %s", request.GetId())
	}
	if len(imageIDs) == 0 {
		return nil
	}

	for _, imageID := range imageIDs {
		if features.FlattenImageData.Enabled() {
			img, found, err := m.imagesV2.GetImageMetadata(allAccessCtx, imageID)
			if err != nil {
				return errors.Wrapf(err, "could not fetch metadata for image %s", imageID)
			}
			if !found {
				continue
			}
			// Determine the effective for the cves in the image scope.
			cveStateMap := m.activeReqCache.GetEffectiveVulnStateForImage(request.GetCves().GetCves(), img.GetName().GetRegistry(), img.GetName().GetRemote(), img.GetName().GetTag())
			for _, cve := range request.GetCves().GetCves() {
				if err := m.imagesV2.UpdateVulnerabilityState(allAccessCtx, cve, []string{imageID}, cveStateMap[cve]); err != nil {
					return errors.Wrapf(err, "could not revert exception applied to vulnerabilities in image %s for request %s", imageID, request.GetId())
				}
			}
		} else {
			img, found, err := m.images.GetImageMetadata(allAccessCtx, imageID)
			if err != nil {
				return errors.Wrapf(err, "could not fetch metadata for image %s", imageID)
			}
			if !found {
				continue
			}
			// Determine the effective for the cves in the image scope.
			cveStateMap := m.activeReqCache.GetEffectiveVulnStateForImage(request.GetCves().GetCves(), img.GetName().GetRegistry(), img.GetName().GetRemote(), img.GetName().GetTag())
			for _, cve := range request.GetCves().GetCves() {
				if err := m.images.UpdateVulnerabilityState(allAccessCtx, cve, []string{imageID}, cveStateMap[cve]); err != nil {
					return errors.Wrapf(err, "could not revert exception applied to vulnerabilities in image %s for request %s", imageID, request.GetId())
				}
			}
		}
	}

	go m.reprocessAffectedEntities(request.GetId(), imageIDs...)
	return nil
}

func (m *managerImpl) reprocessAffectedEntities(requestID string, affectedImages ...string) {
	// Once the Secured Cluster image cache is invalidated, the image pull cycle is run. It further triggers image
	// risk calculation. Hence, we do not need to recalculate risk here.
	if err := m.reprocessImage(requestID, affectedImages...); err != nil {
		log.Errorf("Could not fetch Secured Cluster image cache keys in response to vuln request %q: %v", requestID, err)
	}
	go m.reprocessDeployments(requestID, affectedImages...)
}

func (m *managerImpl) reprocessDeployments(requestID string, affectedImages ...string) {
	// The re-processing will happen anyways at the next re-processing interval.
	depsByCluster, err := m.getAffectedDeployments(affectedImages...)
	if err != nil {
		log.Errorf("Cannot reprocess deployments. "+
			"Could not get deployment affected by vuln request %q: %v", requestID, err)
		return
	}

	var allDeps []string
	for cluster, deps := range depsByCluster {
		allDeps = append(allDeps, deps...)
		conn := m.connManager.GetConnection(cluster)
		if conn == nil {
			continue
		}
		if err := conn.InjectMessage(allAccessCtx, getReprocessDeploymentMsg(deps...)); err != nil {
			log.Errorf("Could not send request to reprocess deployments affected by vuln request %q", requestID)
		}
	}
	// Reprocessor throttles the requests to reprocess deployments once every the reprocessing interval.
	m.reprocessor.ReprocessRiskForDeployments(allDeps...)
}

func (m *managerImpl) reprocessImage(_ string, affectedImages ...string) error {
	imageKeys := make([]*central.InvalidateImageCache_ImageKey, 0, len(affectedImages))
	for _, imgID := range affectedImages {
		if features.FlattenImageData.Enabled() {
			image, found, err := m.imagesV2.GetImage(allAccessCtx, imgID)
			if err != nil {
				return errors.Wrap(err, "could not get image for reprocessing")
			}
			if !found {
				continue
			}
			imageKeys = append(imageKeys, &central.InvalidateImageCache_ImageKey{
				ImageId:       imgID,
				ImageFullName: image.GetName().GetFullName(),
			})
		} else {
			image, found, err := m.images.GetImage(allAccessCtx, imgID)
			if err != nil {
				return errors.Wrap(err, "could not get image for reprocessing")
			}
			if !found {
				continue
			}
			imageKeys = append(imageKeys, &central.InvalidateImageCache_ImageKey{
				ImageId:       imgID,
				ImageFullName: image.GetName().GetFullName(),
			})
		}
	}

	m.connManager.BroadcastMessage(&central.MsgToSensor{
		Msg: &central.MsgToSensor_InvalidateImageCache{
			InvalidateImageCache: &central.InvalidateImageCache{
				ImageKeys: imageKeys,
			},
		},
	})
	return nil
}

func (m *managerImpl) getAffectedDeployments(affectedImages ...string) (map[string][]string, error) {
	searchField := search.ImageSHA
	if features.FlattenImageData.Enabled() {
		searchField = search.ImageID
	}
	query := search.ConjunctionQuery(
		search.NewQueryBuilder().AddExactMatches(searchField, affectedImages...).ProtoQuery(),
		search.NewQueryBuilder().AddStringsHighlighted(search.ClusterID, search.WildcardString).ProtoQuery(),
	)
	results, err := m.deployments.SearchDeployments(allAccessCtx, query)
	if err != nil {
		return nil, errors.Wrap(err, "could not get deployment results")
	}
	if len(results) == 0 {
		return nil, nil
	}

	depsByCluster := make(map[string][]string)
	for _, r := range results {
		clusterIDs := r.GetFieldToMatches()[clusterIDField.GetFieldPath()].GetValues()
		if len(clusterIDs) == 0 {
			log.Errorf("No cluster ID found in fields for deployment %q", r.GetId())
			continue
		}
		depsByCluster[clusterIDs[0]] = append(depsByCluster[clusterIDs[0]], r.GetId())
	}
	return depsByCluster, nil
}

func (m *managerImpl) getImagesIDsForVulnRequest(request *storage.VulnerabilityRequest) ([]string, error) {
	imageQuery, err := utils.GetAffectedImagesQuery(request, nil)
	if err != nil {
		return nil, err
	}
	var results []search.Result
	if features.FlattenImageData.Enabled() {
		results, err = m.imagesV2.Search(allAccessCtx, imageQuery)
	} else {
		results, err = m.images.Search(allAccessCtx, imageQuery)
	}
	if err != nil {
		return nil, err
	}
	return search.ResultsToIDs(results), nil
}

func (m *managerImpl) expireDeferrals(deferrals []*storage.VulnerabilityRequest) error {
	processingErrs := errorhelpers.NewErrorList("re-observing expired deferrals")
	for _, req := range deferrals {
		// A request can be re-observed by just marking it inactive
		// NOTE: It is possible that another request will still force this vulnerability to be deferred (e.g. if this was image scoped
		// but a global one still exists).
		if _, err := m.vulnReqs.MarkRequestInactive(allVulnApproverAccessSac, req.GetId(), "[System Generated] Request expired"); err != nil {
			processingErrs.AddWrapf(err, "marking as inactive request %s", req.GetId())
			continue
		}
		if err := m.UnSnoozeVulnerabilityOnRequest(allVulnApproverAccessSac, req); err != nil {
			processingErrs.AddWrapf(err, "unsnoozing vulns for request %s", req.GetId())
		}
	}
	return processingErrs.ToError()
}

func (m *managerImpl) getExpiredDeferrals() ([]*storage.VulnerabilityRequest, error) {
	now := fmt.Sprintf("<%s", time.Now().Format("01/02/2006 MST"))
	q := search.ConjunctionQuery(
		search.NewQueryBuilder().AddGenericTypeLinkedFields([]search.FieldLabel{search.ExpiredRequest, search.RequestExpiryTime}, []interface{}{false, now}).ProtoQuery(),
		search.NewQueryBuilder().AddExactMatches(search.RequestStatus, storage.RequestStatus_APPROVED.String(), storage.RequestStatus_APPROVED_PENDING_UPDATE.String()).ProtoQuery(),
	)
	results, err := m.vulnReqs.SearchRawRequests(allVulnApproverAccessSac, q)
	if err != nil || len(results) == 0 {
		return nil, err
	}
	return results, nil
}

func (m *managerImpl) revertPastDueDeferralExceptions() {
	if m.stopper.Client().Stopped().IsDone() {
		return
	}

	deferrals, err := m.getExpiredDeferrals()
	if err != nil {
		log.Errorf("error retrieving expired deferral requests for reprocessing: %v", err)
		return
	}
	if len(deferrals) == 0 {
		return
	}

	if m.stopper.Client().Stopped().IsDone() {
		return
	}

	if err := m.expireDeferrals(deferrals); err != nil {
		log.Errorf("Failed to retire expired deferral requests and re-observe associated vulnerabilities with error(s): %+v", err)
	} else {
		log.Infof("Completed retiring %d expired deferral requests and re-observing deferred vulnerabilities", len(deferrals))
	}
}

func (m *managerImpl) getFixableDeferrals() ([]*storage.VulnerabilityRequest, error) {
	ctx := sac.WithAllAccess(context.Background())

	q := search.ConjunctionQuery(
		search.NewQueryBuilder().
			AddBools(search.ExpiredRequest, false).
			AddExactMatches(search.ExpiryType, storage.RequestExpiry_ANY_CVE_FIXABLE.String(), storage.RequestExpiry_ALL_CVE_FIXABLE.String()).
			AddExactMatches(search.RequestStatus, storage.RequestStatus_APPROVED.String(), storage.RequestStatus_APPROVED_PENDING_UPDATE.String()).
			ProtoQuery(),
	)

	requests, err := m.vulnReqs.SearchRawRequests(allVulnApproverAccessSac, q)
	if err != nil || len(requests) == 0 {
		return nil, err
	}
	var fixableReqs []*storage.VulnerabilityRequest
	for _, request := range requests {
		expiryType := request.GetDeferralReq().GetExpiry().GetExpiryType()
		query := search.NewQueryBuilder().
			AddBools(search.Fixable, true).
			AddExactMatches(search.CVE, request.GetCves().GetCves()...).ProtoQuery()

		// Add image scope to the query.
		fixableQuery, err := utils.GetAffectedImagesQuery(request, query)
		if err != nil {
			return nil, err
		}

		count, err := m.imageCVEView.Count(ctx, fixableQuery)
		if err != nil {
			return nil, errors.Wrapf(err, "failed to count on image cves for request %q", request.GetId())
		}
		if expiryType == storage.RequestExpiry_ANY_CVE_FIXABLE {
			if count > 0 {
				fixableReqs = append(fixableReqs, request)
			}
		} else if expiryType == storage.RequestExpiry_ALL_CVE_FIXABLE {
			if count == len(request.GetCves().GetCves()) {
				fixableReqs = append(fixableReqs, request)
			}
		}
	}

	return fixableReqs, nil
}

func (m *managerImpl) revertFixableCVEDeferralExceptions() {
	if m.stopper.Client().Stopped().IsDone() {
		return
	}

	deferrals, err := m.getFixableDeferrals()
	if err != nil {
		log.Errorf("Failed to determine if deferral exception can be reverted: %v", err)
		return
	}
	if len(deferrals) == 0 {
		return
	}

	if m.stopper.Client().Stopped().IsDone() {
		return
	}

	if err := m.expireDeferrals(deferrals); err != nil {
		log.Errorf("Failed to revert vulnerability deferral exceptions: %+v", err)
	} else {
		log.Infof("Reverted %d vulnerability deferral exceptions", len(deferrals))
	}
}

func (m *managerImpl) runExpiredDeferralsProcessor() {
	defer m.stopper.Flow().ReportStopped()
	reObserveTimedDeferralsTicker := time.NewTicker(m.revertTimedDeferralsTickerDuration)
	defer reObserveTimedDeferralsTicker.Stop()
	reObserveWhenFixedDeferralsTicker := time.NewTicker(m.revertFixableCVEDeferralsTickerDuration)
	defer reObserveWhenFixedDeferralsTicker.Stop()

	// Kick off a run to start
	go m.revertPastDueDeferralExceptions()
	go m.revertFixableCVEDeferralExceptions()

	for {
		select {
		case <-m.stopper.Flow().StopRequested():
			return
		case <-reObserveTimedDeferralsTicker.C:
			go m.revertPastDueDeferralExceptions()
		case <-reObserveWhenFixedDeferralsTicker.C:
			go m.revertFixableCVEDeferralExceptions()
		}
	}
}

func (m *managerImpl) buildCache() error {
	// Build active requests cache
	q := utils.GetActiveApprovedReqQuery()
	res, err := m.vulnReqs.Search(allAccessCtx, q)
	if err != nil {
		return errors.Wrap(err, "error retrieving keys from vuln request datastore")
	}
	ids := search.ResultsToIDs(res)
	if err := buildCache(m.vulnReqs, m.activeReqCache, ids...); err != nil {
		return err
	}

	// Build pending requests cache
	q = utils.GetActivePendingReqQuery()
	res, err = m.vulnReqs.Search(allAccessCtx, q)
	if err != nil {
		return errors.Wrap(err, "error retrieving keys from vuln request datastore")
	}
	ids = search.ResultsToIDs(res)
	if err := buildCache(m.vulnReqs, m.pendingReqCache, ids...); err != nil {
		return err
	}
	log.Info("[STARTUP] Successfully cached all vulnerability requests")
	return nil
}

func (m *managerImpl) reconcileLastSeqNum() {
	q := search.EmptyQuery()
	q.Pagination = &v1.QueryPagination{
		Limit: 1,
		SortOptions: []*v1.QuerySortOption{
			{
				Field:    search.CreatedTime.String(),
				Reversed: true,
			},
		},
	}
	reqs, err := m.vulnReqs.SearchRawRequests(allAccessCtx, q)
	if err != nil {
		log.Errorf("could not search vulnerability requests to reconcile the sequence number: %v", err)
		return
	}

	if len(reqs) == 0 {
		return
	}

	m.seqNumLock.Lock()
	defer m.seqNumLock.Unlock()

	// This is independent of any time zones (basically a unix time), therefore, there is no need to convert it to UTC.
	m.lastKnownSeqNumInfo.lastCreatedMonth = protoconv.ConvertTimestampToTimeOrNow(reqs[0].GetCreatedAt()).UTC().Month()
	m.lastKnownSeqNumInfo.seqNum = getSeqNum(reqs[0].GetName())
}

func (m *managerImpl) setIDAndName(req *storage.VulnerabilityRequest) {
	if req == nil {
		return
	}

	req.Id = uuid.NewV4().String()

	m.seqNumLock.Lock()
	defer m.seqNumLock.Unlock()

	req.Name = requestName(req, m.lastKnownSeqNumInfo)
}

func buildCache(vulnReqs vulnReqDataStore.DataStore, cache cache.VulnReqCache, ids ...string) error {
	var processed int
	for idBatch := range slices.Chunk(ids, batchSize) {
		vulnReqs, err := vulnReqs.GetMany(allAccessCtx, idBatch)
		if err != nil {
			return err
		}
		cache.AddMany(vulnReqs...)
		processed += len(idBatch)
		log.Infof("[STARTUP] Successfully cached %d/%d vulnerability requests", processed, len(ids))
	}
	return nil
}

func getReprocessDeploymentMsg(deps ...string) *central.MsgToSensor {
	return &central.MsgToSensor{
		Msg: &central.MsgToSensor_ReprocessDeployment{
			ReprocessDeployment: &central.ReprocessDeployment{
				DeploymentIds: deps,
			},
		},
	}
}

func requestName(req *storage.VulnerabilityRequest, lastKnownSeqNumInfo *monthSeqNumPair) string {
	if req == nil {
		return ""
	}
	if lastKnownSeqNumInfo == nil {
		lastKnownSeqNumInfo = &monthSeqNumPair{}
	}

	requestCreatedAt := protoconv.ConvertTimestampToTimeOrNow(req.GetCreatedAt()).UTC()
	if lastKnownSeqNumInfo.lastCreatedMonth != requestCreatedAt.Month() {
		lastKnownSeqNumInfo.lastCreatedMonth = requestCreatedAt.Month()
		lastKnownSeqNumInfo.seqNum = 0
	}

	lastKnownSeqNumInfo.seqNum = lastKnownSeqNumInfo.seqNum + 1

	userShortName := getShortName(req.GetRequestor())
	return fmt.Sprintf("%s%s%s%s%d", userShortName, common.VulnReqNameSeparator, requestCreatedAt.Format("060102"), common.VulnReqNameSeparator, lastKnownSeqNumInfo.seqNum)
}

func getSeqNum(requestName string) int {
	idx := strings.LastIndex(requestName, common.VulnReqNameSeparator)
	if idx == -1 {
		return 0
	}
	i, err := strconv.Atoi(requestName[idx+1:])
	if err != nil {
		log.Errorf("could not determine the vulnerability request sequence number: %v", err)
	}
	return i
}

func getShortName(user *storage.SlimUser) string {
	if user == nil {
		return common.DefaultUserShortName
	}

	name := strings.ToUpper(user.GetName())
	parts := strings.Split(name, " ")
	for i := 0; i < len(parts); i++ {
		parts[i] = strings.TrimSpace(parts[i])
	}

	firstName := stringutils.FirstNonEmpty(parts...)
	lastName := stringutils.LastNonEmpty(parts...)
	if firstName != "" && lastName != "" {
		return fmt.Sprintf("%c%c", firstName[0], lastName[0])
	}
	return common.DefaultUserShortName
}
